package com.cdgeekcamp.redas.recr_page_worker;

import com.cdgeekcamp.redas.db.model.CompanyRepository;
import com.cdgeekcamp.redas.lib.core.RedasException;
import com.cdgeekcamp.redas.lib.core.api.receivedParameter.RecrPage;
import com.cdgeekcamp.redas.lib.core.jsonObject.JsonObject;
import com.cdgeekcamp.redas.recr_page_worker.component.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

@Component
public class RecrPageWorker implements CommandLineRunner {
    @Autowired
    private CompanyRepository companys;

    @Autowired
    private RedasMqConfig redasMqConfig;

    @Autowired
    private CompanyBuildServiceImpl companyBuildService;

    @Autowired
    private EmployeeBuildServiceImpl employeeBuildService;

    @Autowired
    private PositionBuildServiceImpl positionBuildService;

    @Autowired
    private PositionAdvantageBuildServiceImpl positionAdvantageBuildService;

    @Autowired
    private PositionTagBuildServiceImpl positionTagBuildService;

    @Autowired
    private NatureBuildServiceImpl natureBuildService;

    @Autowired
    private RecrPageStateBuildServiceImpl recrPageStateBuildService;

    @Autowired
    private RecrPageStorageBuildServiceImpl recrPageMetaDataBuildService;

    @Autowired
    private JpaTransactionManager transactionManager;

    @Autowired
    private RedasRecrPageWorkerConfiguration redasRecrPageWorkerConfiguration;

    /**
     * 将招聘数据入库并带有数据库事务
     *
     * @param recrPage          招聘数据对象
     * @param recrPageStorageId 招聘数据存储编号
     */
    private void processMessageWithTransactional(
            RecrPage recrPage, Integer recrPageStorageId) throws RedasException {
        Integer companyId = companyBuildService.build(recrPage, 0);
        Integer employeeId = employeeBuildService.build(recrPage, companyId);
        Integer positionId = positionBuildService.build(recrPage, companyId, employeeId);

        natureBuildService.build(recrPage, companyId);
        positionAdvantageBuildService.build(recrPage, positionId);
        positionTagBuildService.build(recrPage, positionId);

        // 入库成功，设置状态标记
        recrPageStateBuildService.build(recrPage, recrPageStorageId, positionId);
    }

    /**
     * 处理消费者接收到的JSON格式消息，按照数据规则分表入库
     *
     * @param message JSON格式的消息
     */
    private void processMessage(String message) throws RedasException, NoSuchAlgorithmException {
        JsonObject recrPageJson = new JsonObject();
        RecrPage recrPage = recrPageJson.fromJson(message, RecrPage.class);

        // 优先保存元数据，不管之后的数据库事务是否成功
        // 如果事务成功，在RecrPageState表中会存在记录
        Integer recrPageStorageId = recrPageMetaDataBuildService.build(recrPage, 0);

        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别，开启新事务，这样会比较安全些。
        TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态

        try {
            processMessageWithTransactional(recrPage, recrPageStorageId);
            transactionManager.commit(status);
        } catch (Exception e) {
            transactionManager.rollback(status);
        }
    }

    @Override
    public void run(String... args) throws Exception {
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, redasMqConfig.getHost());
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.GROUP_ID_CONFIG, redasMqConfig.getGroup());

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);

        try (kafkaConsumer) {
            kafkaConsumer.subscribe(Collections.singletonList(redasMqConfig.getTopic()));

            // 以下注释用来消除 while 语句警告提示
            //noinspection InfiniteLoopStatement
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic:%s,offset:%d,消息:%s%n",
                            record.topic(), record.offset(), record.value());

                    processMessage(record.value());
                }
            }
        }
    }
}
